package org.idea.mq.redis.framework.redis;

import org.idea.mq.redis.framework.config.IRedisFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author idea
 * @data 2020/4/1
 */
public class IRedisServiceImpl implements IRedisService {

    private static Logger log = LoggerFactory.getLogger(IRedisServiceImpl.class);

    private IRedisFactory iRedisFactory;

    public void setIRedisFactory(IRedisFactory iRedisFactory) {
        this.iRedisFactory = iRedisFactory;
    }


    @Override
    public String getStr(String key) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            return jedis.get(key);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Boolean setStr(String key, String value) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            jedis.set(key, value);
            return true;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public long lpush(String listKey, String... value) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            return jedis.lpush(listKey, value);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String rpop(String listKey) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            return jedis.rpop(listKey);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public List<String> brpop(String listKey) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            //10秒超时
            return jedis.brpop(10, listKey);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean subscribe(JedisPubSub jedisPubSub, String... channel) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            jedis.subscribe(jedisPubSub, channel);
            return true;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean pSubscribe(JedisPubSub jedisPubSub, String pattern) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            jedis.psubscribe(jedisPubSub, new String[]{pattern});
            return true;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean publish(String channel, String content) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            jedis.publish(channel, content);
            return true;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean xAdd(String streamName, Map<String, String> stringMap) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap);
            return true;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public List<byte[]> xRange(String streamName, String begin, String end, int limit) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            List<byte[]> result = jedis.xrange(streamName.getBytes(), begin.getBytes(), end.getBytes(), limit);
            return result;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public List<Map.Entry<String, List<StreamEntry>>> xBlockRead(String streamName) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            XReadParams xReadParams = new XReadParams();
            xReadParams.block(0);
            Map<String, StreamEntryID> entry = new HashMap<>();
            entry.put(streamName, StreamEntryID.UNRECEIVED_ENTRY);
            return jedis.xread(xReadParams, entry);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String xGroup(String streamName, String groupName) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            //true表示如果stream不存在则内部创建
            return jedis.xgroupCreate(streamName, groupName, StreamEntryID.LAST_ENTRY, true);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String streamName, String groupName, String consumerName) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            XReadGroupParams xReadGroupParams = new XReadGroupParams();
            xReadGroupParams.block(0);
            xReadGroupParams.count(1);
            Map<String, StreamEntryID> entry = new HashMap<>(1);
            entry.put(streamName, StreamEntryID.UNRECEIVED_ENTRY);
            return jedis.xreadGroup(groupName, consumerName, xReadGroupParams, entry);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String streamName, String groupName, StreamEntryID streamEntryID, int count, String consumerName) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            XReadGroupParams xReadGroupParams = new XReadGroupParams();
            xReadGroupParams.block(0);
            xReadGroupParams.count(count);
            Map<String, StreamEntryID> entry = new HashMap<>(1);
            entry.put(streamName, streamEntryID);
            return jedis.xreadGroup(groupName, consumerName, xReadGroupParams, entry);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public long xreadGroup(String streamName, String groupName, StreamEntryID streamEntryID) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            return jedis.xack(streamName, groupName,streamEntryID);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    @Override
    public List<StreamPendingEntry> xpending(String streamName, String groupName, StreamEntryID start, int count) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            XPendingParams xPendingParams = new XPendingParams();
            xPendingParams.count(count);
            xPendingParams.start(start);
            return jedis.xpending(streamName, groupName, xPendingParams);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public List<StreamConsumersInfo> xInfo(String streamName, String groupName) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            return jedis.xinfoConsumers(streamName, groupName);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public long xack(String streamName, String groupName, StreamEntryID streamEntryID) {
        try (Jedis jedis = iRedisFactory.getConnection()) {
            return jedis.xack(streamName, groupName,streamEntryID);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
